Create DataFrame Using Parquet File

Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON, supported by many data processing systems. It is compatible with most of the data processing frameworks in the Hadoop echo systems. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk. Spark SQL provides support for both reading and writing Parquet files that automatically capture the schema of the original data, It also reduces data storage by 75% on average.

Below are some advantages of storing data in a parquet format. Spark by default supports Parquet in its library hence we don’t need to add any dependency libraries.

Create DataFrame
val empDF = spark.createDataFrame(Seq(
      (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
    )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")

Write DataFrame to Parquet File Format
Using spark.write.parquet() function we can write Spark DataFrame to Parquet file, and parquet() function is provided in DataFrameWriter class. As mentioned earlier Spark doesn’t need any additional packages or libraries to use Parquet as it by default provides with Spark.

empDF.write.parquet("/FileStore/tables/emp.parquet")

Writing Spark DataFrame to Parquet format preserves the column names and data types, and all columns are automatically converted to be nullable for compatibility reasons. Notice that all part files Spark creates has parquet extension.

Read Parquet File into DataFrame
Similar to write, DataFrameReader provides parquet() function (spark.read.parquet) to read the parquet files and creates a Spark DataFrame. we are reading data from an apache parquet file we have written before.

val parqDF = spark.read.parquet("/FileStore/tables/emp.parquet")
parqDF.show

printing schema of DataFrame returns columns with the same names and data types.

parqDF.printSchema

Appending to existing Parquet File
Spark provides the capability to append DataFrame to existing parquet files using “append” save mode. In case, if you want to overwrite use “overwrite” save mode.

empDF.write.mode("append").parquet("/FileStore/tables/emp.parquet")

Using SQL Queries on Parquet
We can also create a temporary view on Parquet files and then use it in Spark SQL statements. This temporary table would be available until the SparkContext present.

parqDF.createOrReplaceTempView("ParquetTable")

SQL Queries
spark.sql("select * from ParquetTable").show

spark.sql("select * from ParquetTable where sal >= 1500").show

Above predicate on spark parquet file does the file scan which is performance bottleneck like table scan on a traditional database. We should use partitioning in order to improve performance.

Spark parquate partation - Imporving Performance
Partitioning is a feature of many databases and data processing frameworks and it is key to make jobs work at scale. We can do a parquet file partition using spark partitionBy function.

empDF.write.parquet("/FileStore/tables/partemp.parquet")
empDF.write.partitionBy("job","sal").parquet("/FileStore/tables/emp3.parquet")

Parquet Partition creates a folder hierarchy for each spark partition; we have mentioned the first partition as gender followed by sal hence, it creates a sal folder inside the job folder.

read Parquet partition
This is an example of how to write a Spark DataFrame by preserving the partitioning on job and sal columns.

val parqDF = spark.read.parquet("/FileStore/tables/emp3.parquet")
parqDF.createOrReplaceTempView("ParquetTable1")

val df = spark.sql("select * from ParquetTable1  where job='MANAGER' and sal >= 1500")

The execution of this query is significantly faster than the query without partition. It filters the data first on gender and then applies filters on salary.

Spark read a specific Parquet partition
spark.read.parquet("/FileStore/tables/emp3.parquet/job=MANAGER").show

This code snippet retrieves the data from the job partition value “MANAGER”.

Complete Spark Parquet Example
package com.sparkbyexamples.spark.dataframe
import org.apache.spark.sql.SparkSession
object ParquetExample {
  def main(args:Array[String]):Unit= {
    val spark: SparkSession = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .getOrCreate()

    val data = Seq(("James ","","Smith","36636","M",3000),
      ("Michael ","Rose","","40288","M",4000),
      ("Robert ","","Williams","42114","M",4000),
      ("Maria ","Anne","Jones","39192","F",4000),
      ("Jen","Mary","Brown","","F",-1)
    )
    val columns = Seq("firstname","middlename","lastname","dob","gender","salary")
    import spark.sqlContext.implicits._
    val df = data.toDF(columns:_*)

    df.show()
    df.printSchema()

    df.write
      .parquet("/tmp/output/people.parquet")

    val parqDF = spark.read.parquet("/tmp/output/people.parquet")
    parqDF.createOrReplaceTempView("ParquetTable")

    spark.sql("select * from ParquetTable where salary >= 4000").explain()
    val parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

    parkSQL.show()
    parkSQL.printSchema()

    df.write
      .partitionBy("gender","salary")
      .parquet("/tmp/output/people2.parquet")

    val parqDF2 = spark.read.parquet("/tmp/output/people2.parquet")
    parqDF2.createOrReplaceTempView("ParquetTable2")

    val df3 = spark.sql("select * from ParquetTable2  where gender='M' and salary >= 4000")
    df3.explain()
    df3.printSchema()
    df3.show()

    val parqDF3 = spark.read
      .parquet("/tmp/output/people2.parquet/gender=M")
    parqDF3.show()

  }
}


Processing the Parquet Files
Apache Parquet is another columnar-based data format used by many tools in the Hadoop tool set for file I/O, such as Hive, Pig, and Impala. It increases performance  by using efficient compression and encoding routines.  The Parquet processing example is very similar to the JSON Scala code. The  DataFrame is created, and then saved in a Parquet format using the save method with a type of Parquet:

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
object sql1 {
def main(args: Array[String]) {
val appName = "Parquet example"
val conf = new SparkConf()
conf.setAppName(appName)
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val rawRdd = sc.textFile("hdfs:///data/spark/sql/adult.test.data_1x")
val schemaString = "age workclass fnlwgt education " +
"educational-num marital-status occupation relationship " +
"race gender capital-gain capital-loss hours-per-week " +
"native-country income"
val schema = StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, true)))
val rowRDD = rawRdd.map(_.split(","))
.map(p => Row( p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),
p(9),p(10),p(11),p(12),p(13),p(14) ))
val adultDataFrame = sqlContext.createDataFrame(rowRDD, schema)
adultDataFrame.save("hdfs:///data/spark/sql/adult.parquet","parquet")
}
}

No comments:

Post a Comment